In [1]:
from elasticsearch import Elasticsearch, helpers, exceptions as es_exceptions
from elasticsearch.helpers import bulk,scan
es = Elasticsearch(['atlas-kibana.mwt2.org:9200'],timeout=60)
In [2]:
# define our address so we dont have to type it all the time
es.cluster.health()
Out[2]:
In [3]:
import random
allEvents=[]
for e in range(1000):
E=random.gauss(1000., 30.)
pT=random.gauss(100., 50.)
allEvents.append({ 'eventnr':e, 'E':E, 'pT':pT})
In [4]:
from time import time
start=time()
for nr, event in enumerate(allEvents):
es.create(index='my_events', doc_type='event', id=nr, body=event)
print("it took", time()-start,"seconds.")
In [5]:
# clean all
try:
es.indices.delete(index='my_events')
except:
print("not there?")
start=time()
try:
res = helpers.bulk(es, allEvents, index='my_events', doc_type='event', raise_on_exception=True,request_timeout=60)
except es_exceptions.ConnectionError as e:
print('ConnectionError ', e)
except es_exceptions.TransportError as e:
print('TransportError ', e)
except helpers.BulkIndexError as e:
print(e[0])
for i in e[1]:
print(i)
except Exception as e:
print('Something seriously wrong happened.',e)
print("it took", time()-start,"seconds.")
In [6]:
my_query={
"size": 15,
"query":{
"bool":{
"must":[
{'range': {'pT': {'gte': 100, 'lt': 120}}},
{"range" : { "E" :{'gte': 200 }}}
]
}
}
}
res = es.search(index='my_events', body=my_query )
for r in res['hits']['hits']:
print(r)
In [7]:
my_query={
"aggs" : {
"pt_bins" : {
"histogram" : {
"field" : "pT",
"interval" : 50
}
}
}
}
res = es.search(index='my_events', body=my_query )
print(res['aggregations'])
In [8]:
es.indices.delete(index='my_events')
Out[8]:
In [ ]: